Improve intra-target parallelism
authorAlex Crichton <alex@alexcrichton.com>
Fri, 11 Jul 2014 14:50:24 +0000 (07:50 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Fri, 11 Jul 2014 14:50:24 +0000 (07:50 -0700)
With cross compilation soon on the horizon, it may be required to build a
library for both the host and target architectures. These two copies can
certainly be built in parallel. Additionally, all binaries produced by a package
can also be built in parallel, but are currently forced to be built
sequentially.

This commit improves this parallelism by allowing each job to create more work
before the package is considered built. Only after all targets have been built
is the new fingerprint written.

src/cargo/ops/cargo_rustc.rs

index 6f5acc64620cffe5ce2bbb7fcc7036ac6e8a1e3e..2ec1626bfb241ec1150deca2bb7216608d3f8f76 100644 (file)
@@ -1,9 +1,11 @@
+use std::collections::HashMap;
 use std::hash::Hasher;
 use std::hash::sip::SipHasher;
 use std::io::{File, IoError};
 use std::io;
 use std::os::args;
 use std::str;
+use std::sync::{atomics, Arc};
 use term::color::YELLOW;
 
 use core::{Package, PackageSet, Target, Resolve};
@@ -24,7 +26,20 @@ struct Context<'a, 'b> {
     dylib: (String, String)
 }
 
-type Job = proc():Send -> CargoResult<()>;
+enum Job {
+    Work(proc():Send -> CargoResult<Vec<Job>>),
+}
+
+impl Job {
+    fn all(jobs: Vec<Job>, after: Vec<Job>) -> Job {
+        Work(proc() {
+            for Work(job) in jobs.move_iter() {
+                try!(job());
+            }
+            Ok(after)
+        })
+    }
+}
 
 // This is a temporary assert that ensures the consistency of the arguments
 // given the current limitations of Cargo. The long term fix is to have each
@@ -112,23 +127,23 @@ pub fn compile_targets<'a>(env: &str, targets: &[&Target], pkg: &Package,
             }
         }).collect::<Vec<&Target>>();
 
-        jobs.push((dep, try!(compile(targets.as_slice(), dep, &mut cx))));
+        try!(compile(targets.as_slice(), dep, &mut cx, &mut jobs));
     }
 
     cx.primary = true;
     cx.dest = &target_dir;
-    jobs.push((pkg, try!(compile(targets, pkg, &mut cx))));
+    try!(compile(targets, pkg, &mut cx, &mut jobs));
 
     // Now that we've figured out everything that we're going to do, do it!
     execute(cx.config, jobs)
 }
 
-fn compile(targets: &[&Target], pkg: &Package,
-           cx: &mut Context) -> CargoResult<(Freshness, Job)> {
+fn compile<'a>(targets: &[&Target], pkg: &'a Package, cx: &mut Context,
+               jobs: &mut Vec<(&'a Package, Freshness, Job)>) -> CargoResult<()> {
     debug!("compile_pkg; pkg={}; targets={}", pkg, targets);
 
     if targets.is_empty() {
-        return Ok((Fresh, proc() Ok(())))
+        return Ok(())
     }
 
     // First check to see if this package is fresh.
@@ -141,43 +156,77 @@ fn compile(targets: &[&Target], pkg: &Package,
     // This is not quite accurate, we should only trigger forceful
     // recompilations for downstream dependencies of ourselves, not everyone
     // compiled afterwards.a
-    //
-    // TODO: Figure out how this works with targets
     let fingerprint_loc = cx.dest.join(format!(".{}.fingerprint",
                                                pkg.get_name()));
 
     let (is_fresh, fingerprint) = try!(is_fresh(pkg, &fingerprint_loc, cx,
                                                 targets));
 
-    let mut cmds = Vec::new();
-
+    // First part of the build step of a target is to execute all of the custom
+    // build commands.
+    //
     // TODO: Should this be on the target or the package?
+    let mut build_cmds = Vec::new();
     for build_cmd in pkg.get_manifest().get_build().iter() {
-        cmds.push(compile_custom(pkg, build_cmd.as_slice(), cx));
+        build_cmds.push(compile_custom(pkg, build_cmd.as_slice(), cx));
     }
 
     // After the custom command has run, execute rustc for all targets of our
     // package.
+    //
+    // Note that bins can all be built in parallel because they all depend on
+    // one another, but libs must be built sequentially because they may have
+    // interdependencies.
+    let mut libs = Vec::new();
+    let mut bins = Vec::new();
     for &target in targets.iter() {
-        cmds.push(rustc(pkg, target, cx));
+        let job = rustc(pkg, target, cx);
+        if target.is_lib() {
+            libs.push(job);
+        } else {
+            bins.push(job);
+        }
     }
 
-    cmds.push(proc() {
-        // If this job runs, then everything has successfully compiled, so write
-        // our new fingerprint to the relevant location to prevent
-        // recompilations in the future.
-        try!(File::create(&fingerprint_loc).write_str(fingerprint.as_slice()));
-        Ok(())
-    });
+    // Only after all the binaries have been built can we actually write the
+    // fingerprint. Currently fingerprints are transactionally done per package,
+    // not per-target.
+    //
+    // TODO: Can a fingerprint be per-target instead of per-package? Doing so
+    //       would likely involve altering the granularity of key for the
+    //       dependency queue that is later used to run jobs.
+    let state = Arc::new(atomics::AtomicUint::new(bins.len()));
+    let write_fingerprint = || {
+        let (my_state, fingerprint_loc, fingerprint) =
+            (state.clone(), fingerprint_loc.clone(), fingerprint.clone());
+        Work(proc() {
+            if my_state.load(atomics::SeqCst) == 0 {
+                let mut file = try!(File::create(&fingerprint_loc));
+                try!(file.write_str(fingerprint.as_slice()));
+            }
+            Ok(Vec::new())
+        })
+    };
 
-    // TODO: this job itself may internally be parallel, but we're hiding that
-    //       currently. How to expose the parallelism among a single target?
-    Ok((if is_fresh {Fresh} else {Dirty}, proc() {
-        for cmd in cmds.move_iter() {
-            try!(cmd());
-        }
-        Ok(())
-    }))
+    // Note that we build the job backwards because each job will produce more
+    // work.
+    let build_libs = if bins.len() == 0 {
+        Job::all(libs, vec![write_fingerprint()])
+    } else {
+        Job::all(libs, bins.move_iter().map(|Work(bin)| {
+            let my_state = state.clone();
+            let write = write_fingerprint();
+            Work(proc() {
+                try!(bin());
+                my_state.fetch_sub(1, atomics::SeqCst);
+                Ok(vec![write])
+            })
+        }).collect())
+    };
+    let job = Job::all(build_cmds, vec![build_libs]);
+
+    jobs.push((pkg, if is_fresh {Fresh} else {Dirty}, job));
+    Ok(())
 }
 
 fn is_fresh(dep: &Package, loc: &Path,
@@ -229,7 +278,10 @@ fn compile_custom(pkg: &Package, cmd: &str,
     for arg in cmd {
         p = p.arg(arg);
     }
-    proc() p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human())
+    Work(proc() {
+        try!(p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human()));
+        Ok(Vec::new())
+    })
 }
 
 fn rustc(package: &Package, target: &Target, cx: &mut Context) -> Job {
@@ -245,19 +297,22 @@ fn rustc(package: &Package, target: &Target, cx: &mut Context) -> Job {
 
     log!(5, "command={}", rustc);
 
-    let _ = cx.config.shell().verbose(|shell| shell.status("Running", rustc.to_string()));
+    let _ = cx.config.shell().verbose(|shell| {
+        shell.status("Running", rustc.to_string())
+    });
 
-    proc() {
+    Work(proc() {
         if primary {
             log!(5, "executing primary");
-            rustc.exec().map_err(|err| human(err.to_string()))
+            try!(rustc.exec().map_err(|err| human(err.to_string())))
         } else {
             log!(5, "executing deps");
-            rustc.exec_with_output().and(Ok(())).map_err(|err| {
+            try!(rustc.exec_with_output().and(Ok(())).map_err(|err| {
                 human(err.to_string())
-            })
+            }))
         }
-    }
+        Ok(Vec::new())
+    })
 }
 
 fn prepare_rustc(package: &Package, target: &Target, crate_types: Vec<&str>,
@@ -389,29 +444,31 @@ fn dep_targets(pkg: &Package, cx: &Context) -> Vec<Target> {
 /// necessary dependencies, in order. Freshness is propagated as far as possible
 /// along each dependency chain.
 fn execute(config: &mut Config,
-           jobs: Vec<(&Package, (Freshness, Job))>) -> CargoResult<()> {
+           jobs: Vec<(&Package, Freshness, Job)>) -> CargoResult<()> {
     let pool = TaskPool::new(config.jobs());
     let (tx, rx) = channel();
     let mut queue = DependencyQueue::new();
-    for &(pkg, _) in jobs.iter() {
+    for &(pkg, _, _) in jobs.iter() {
         queue.register(pkg);
     }
-    for (pkg, (fresh, job)) in jobs.move_iter() {
+    for (pkg, fresh, job) in jobs.move_iter() {
         queue.enqueue(pkg, fresh, (pkg, job));
     }
 
     // Iteratively execute the dependency graph. Each turn of this loop will
     // schedule as much work as possible and then wait for one job to finish,
     // possibly scheduling more work afterwards.
-    let mut active = 0i;
+    let mut active = HashMap::new();
     while queue.len() > 0 {
         loop {
             match queue.dequeue() {
                 Some((name, Fresh, (pkg, _))) => {
+                    assert!(active.insert(name.clone(), 1u));
                     try!(config.shell().status("Fresh", pkg));
-                    tx.send((name, Fresh, Ok(())));
+                    tx.send((name, Fresh, Ok(Vec::new())));
                 }
-                Some((name, Dirty, (pkg, job))) => {
+                Some((name, Dirty, (pkg, Work(job)))) => {
+                    assert!(active.insert(name.clone(), 1));
                     try!(config.shell().status("Compiling", pkg));
                     let my_tx = tx.clone();
                     pool.execute(proc() my_tx.send((name, Dirty, job())));
@@ -423,11 +480,28 @@ fn execute(config: &mut Config,
         // Now that all possible work has been scheduled, wait for a piece of
         // work to finish. If any package fails to build then we stop scheduling
         // work as quickly as possibly.
-        active -= 1;
-        match rx.recv() {
-            (name, fresh, Ok(())) => queue.finish(&name, fresh),
-            (_, _, Err(e)) => {
-                if active > 0 && config.jobs() > 1 {
+        let (name, fresh, result) = rx.recv();
+        *active.get_mut(&name) -= 1;
+        match result {
+            Ok(v) => {
+                for Work(job) in v.move_iter() {
+                    *active.get_mut(&name) += 1;
+                    let my_tx = tx.clone();
+                    let my_name = name.clone();
+                    pool.execute(proc() {
+                        my_tx.send((my_name, fresh, job()));
+                    });
+                }
+                if *active.get(&name) == 0 {
+                    active.remove(&name);
+                    queue.finish(&name, fresh);
+                }
+            }
+            Err(e) => {
+                if *active.get(&name) == 0 {
+                    active.remove(&name);
+                }
+                if active.len() > 0 && config.jobs() > 1 {
                     try!(config.shell().say("Build failed, waiting for other \
                                              jobs to finish...", YELLOW));
                     for _ in rx.iter() {}